package gl.java.network.transport.kcp.example.rtt;

import gl.java.network.transport.kcp.core.UkcpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**

 */
public class KcpRttClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(KcpRttClientHandler.class);


    private volatile int maxCount;
    private AtomicInteger index = new AtomicInteger();
    private ScheduledExecutorService scheduleSrv;

    private ScheduledFuture<?> future = null;
    RttWindow w;

    /**
     * Creates a client-side handler.
     */
    public KcpRttClientHandler(int count) {
        this.maxCount = count;
        scheduleSrv = Executors.newSingleThreadScheduledExecutor();
    }

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        log.info(" channel active");
//        ctx.write(Rtt.encodeRttMsg(KcpRttClientHandler.this.index.incrementAndGet(), System.currentTimeMillis()));
        future = scheduleSrv.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                int index = KcpRttClientHandler.this.index.incrementAndGet();
                ctx.write(Rtt.encodeRttMsg(index, System.currentTimeMillis()));
                log.info("write index:" + index);
                if (maxCount > 0 && index >= maxCount) {
                    // finish
                    future.cancel(true);
                    ctx.write(Rtt.encodeRttMsg(-1, System.currentTimeMillis()));
                }
                ctx.flush();
//                log.info("send");
            }
        }, KcpRttClient.RTT_INTERVAL, KcpRttClient.RTT_INTERVAL, TimeUnit.MILLISECONDS);
        w = new RttWindow();
        w.setTitle("Kcp");
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        scheduleSrv.shutdown();
        scheduleSrv.awaitTermination(1, TimeUnit.SECONDS);

    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        log.info("recv.size: " + buf.readableBytes());
        Rtt rtt = Rtt.decodeRttMsg(buf);
        while (rtt != null) {
            w.append(System.currentTimeMillis() - rtt.time);
            w.repaint();
            rtt = Rtt.decodeRttMsg(buf);
        }
        if (buf.readableBytes() <= 0) {
            buf.release();
        }

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        log.error("exceptionCaught", cause);
        ctx.close();
    }

}
